/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.streampark.flink.core

import org.apache.streampark.common.enums.FlinkSqlValidationFailedType
import org.apache.streampark.common.util.Logger
import enumeratum.EnumEntry

import java.lang.{Boolean => JavaBool}
import java.util.Scanner
import java.util.regex.{Matcher, Pattern}
import scala.collection.immutable
import scala.collection.mutable.ListBuffer
import scala.util.control.Breaks.{break, breakable}

object SqlCommandParser extends Logger {

  def parseSQL(sql: String, validationCallback: FlinkSqlValidationResult => Unit = null): List[SqlCommandCall] = {
    val sqlEmptyError = "verify failed: flink sql cannot be empty."
    require(sql != null && sql.trim.nonEmpty, sqlEmptyError)
    val sqlSegments = SqlSplitter.splitSql(sql)
    sqlSegments match {
      case s if s.isEmpty =>
        if (validationCallback != null) {
          validationCallback(
            FlinkSqlValidationResult(
              success = false,
              failedType = FlinkSqlValidationFailedType.VERIFY_FAILED,
              exception = sqlEmptyError
            )
          )
          null
        } else {
          throw new IllegalArgumentException(sqlEmptyError)
        }
      case segments =>
        val calls = new ListBuffer[SqlCommandCall]
        for (segment <- segments) {
          parseLine(segment) match {
            case Some(x) => calls += x
            case _ =>
              if (validationCallback != null) {
                validationCallback(
                  FlinkSqlValidationResult(
                    success = false,
                    failedType = FlinkSqlValidationFailedType.UNSUPPORTED_SQL,
                    lineStart = segment.start,
                    lineEnd = segment.end,
                    exception = s"unsupported sql",
                    sql = segment.sql
                  )
                )
              } else {
                throw new UnsupportedOperationException(s"unsupported sql: ${segment.sql}")
              }
          }
        }

        calls.toList match {
          case c if c.isEmpty =>
            if (validationCallback != null) {
              validationCallback(
                FlinkSqlValidationResult(
                  success = false,
                  failedType = FlinkSqlValidationFailedType.VERIFY_FAILED,
                  exception = "flink sql syntax error, no executable sql"
                )
              )
              null
            } else {
              throw new UnsupportedOperationException("flink sql syntax error, no executable sql")
            }
          case r => r.sortWith((a, b) => a.lineStart < b.lineStart)
        }
    }
  }

  private[this] def parseLine(sqlSegment: SqlSegment): Option[SqlCommandCall] = {
    val sqlCommand = SqlCommand.get(sqlSegment.sql.trim)
    if (sqlCommand == null) None else {
      val matcher = sqlCommand.matcher
      val groups = new Array[String](matcher.groupCount)
      for (i <- groups.indices) {
        groups(i) = matcher.group(i + 1)
      }
      sqlCommand.converter(groups).map(x =>
        SqlCommandCall(sqlSegment.start, sqlSegment.end, sqlCommand, x, sqlSegment.sql.trim)
      )
    }
  }

}

object Converters {
  val NO_OPERANDS = (_: Array[String]) => Some(Array.empty[String])
}

sealed abstract class SqlCommand(
                                  val name: String,
                                  private val regex: String,
                                  val converter: Array[String] => Option[Array[String]] = (x: Array[String]) => Some(Array[String](x.head))
                                ) extends EnumEntry {
  var matcher: Matcher = _

  def matches(input: String): Boolean = {
    if (regex == null) false else {
      val pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE | Pattern.DOTALL)
      matcher = pattern.matcher(input)
      matcher.matches()
    }
  }
}

object SqlCommand extends enumeratum.Enum[SqlCommand] {

  def get(stmt: String): SqlCommand = {
    var cmd: SqlCommand = null
    breakable {
      this.values.foreach(x => {
        if (x.matches(stmt)) {
          cmd = x
          break()
        }
      })
    }
    cmd
  }

  val values: immutable.IndexedSeq[SqlCommand] = findValues

  //---- SELECT Statements--------------------------------------------------------------------------------------------------------------------------------
  case object SELECT extends SqlCommand(
    "select",
    "(SELECT\\s+.+)"
  )


  //----CREATE Statements--------------------------------------------------------------------------------------------------------------------------------

  /**
   * <pre>
   * CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
   * (
   * { <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
   * [ <watermark_definition> ]
   * [ <table_constraint> ][ , ...n]
   * )
   * [COMMENT table_comment]
   * [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
   * WITH (key1=val1, key2=val2, ...)
   * [ LIKE source_table [( <like_options> )] ]
   * </pre
   */
  case object CREATE_TABLE extends SqlCommand(
    "create table",
    "(CREATE\\s+(TEMPORARY\\s+|)TABLE\\s+.+)"
  )

  /**
   * <pre>
   * CREATE CATALOG catalog_name
   * WITH (key1=val1, key2=val2, ...)
   * </pre>
   */
  case object CREATE_CATALOG extends SqlCommand(
    "create catalog",
    "(CREATE\\s+CATALOG\\s+.+)"
  )

  /**
   * <pre>
   * CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name<br>
   * [COMMENT database_comment]<br>
   * WITH (key1=val1, key2=val2, ...)<br>
   * </pre>
   */
  case object CREATE_DATABASE extends SqlCommand(
    "create database",
    "(CREATE\\s+DATABASE\\s+.+)"
  )


  /**
   * <pre>
   * CREATE [TEMPORARY] VIEW [IF NOT EXISTS] [catalog_name.][db_name.]view_name
   * [( columnName [, columnName ]* )] [COMMENT view_comment]
   * AS query_expression<
   * </pre
   */
  case object CREATE_VIEW extends SqlCommand(
    "create view",
    "(CREATE\\s+(TEMPORARY\\s+|)VIEW\\s+(\\S+)\\s+AS\\s+SELECT\\s+.+)"
  )

  /**
   * <pre>
   * CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
   * [IF NOT EXISTS] [catalog_name.][db_name.]function_name
   * AS identifier [LANGUAGE JAVA|SCALA|PYTHON]
   * </pre
   */
  case object CREATE_FUNCTION extends SqlCommand(
    "create function",
    "(CREATE\\s+(TEMPORARY\\s+|TEMPORARY\\s+SYSTEM\\s+|)FUNCTION\\s+(IF\\s+NOT\\s+EXISTS\\s+|)(.*)\\s+AS\\s+.*)"
  )


  //----DROP Statements--------------------------------------------------------------------------------------------------------------------------------

  /**
   * <pre>
   * DROP statements are used to remove a catalog with the given catalog name or to remove a registered table/view/function from the current or specified Catalog.
   *
   * Flink SQL supports the following DROP statements for now:
   * * DROP CATALOG
   * * DROP TABLE
   * * DROP DATABASE
   * * DROP VIEW
   * * DROP FUNCTION
   * </pre>
   */

  /**
   * <strong>DROP CATALOG [IF EXISTS] catalog_name</strong>
   */
  case object DROP_CATALOG extends SqlCommand(
    "drop catalog",
    "(DROP\\s+CATALOG\\s+.+)"
  )

  /**
   * <strong>DROP [TEMPORARY] TABLE [IF EXISTS] [catalog_name.][db_name.]table_name</strong>
   */
  case object DROP_TABLE extends SqlCommand(
    "drop table",
    "(DROP\\s+(TEMPORARY\\s+|)TABLE\\s+.+)"
  )

  /**
   * <strong>DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ]</strong>
   */
  case object DROP_DATABASE extends SqlCommand(
    "drop database",
    "(DROP\\s+DATABASE\\s+.+)"
  )

  /**
   * <strong>DROP [TEMPORARY] VIEW  [IF EXISTS] [catalog_name.][db_name.]view_name</strong>
   */
  case object DROP_VIEW extends SqlCommand(
    "drop view",
    "(DROP\\s+(TEMPORARY\\s+|)VIEW\\s+.+)"
  )

  /**
   * <strong>DROP [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF EXISTS] [catalog_name.][db_name.]function_name</strong>
   */
  case object DROP_FUNCTION extends SqlCommand(
    "drop function",
    "(DROP\\s+(TEMPORARY\\s+|TEMPORARY\\s+SYSTEM\\s+|)FUNCTION\\s+.+)"
  )


  //----ALTER Statements--------------------------------------------------------------------------------------------------------------------------------

  /**
   * <strong>ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_name</strong>
   *
   * <strong>ALTER TABLE [catalog_name.][db_name.]table_name SET (key1=val1, key2=val2, ...)</strong>
   */
  case object ALTER_TABLE extends SqlCommand(
    "alter table",
    "(ALTER\\s+TABLE\\s+.+)"
  )

  /**
   * <strong>ALTER VIEW [catalog_name.][db_name.]view_name RENAME TO new_view_name</strong>
   *
   * <strong>ALTER VIEW [catalog_name.][db_name.]view_name AS new_query_expression</strong>
   */
  case object ALTER_VIEW extends SqlCommand(
    "alter view",
    "(ALTER\\s+VIEW\\s+.+)"
  )

  /**
   * <strong>ALTER DATABASE [catalog_name.]db_name SET (key1=val1, key2=val2, ...)</strong>
   */
  case object ALTER_DATABASE extends SqlCommand(
    "alter database",
    "(ALTER\\s+DATABASE\\s+.+)"
  )

  /**
   * <strong>
   * ALTER [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
   * [IF EXISTS] [catalog_name.][db_name.]function_name
   * AS identifier [LANGUAGE JAVA|SCALA|PYTHON]
   * </strong>
   */
  case object ALTER_FUNCTION extends SqlCommand(
    "alter function",
    "(ALTER\\s+(TEMPORARY\\s+|TEMPORARY\\s+SYSTEM\\s+|)FUNCTION\\s+.+)"
  )


  //---- INSERT Statement--------------------------------------------------------------------------------------------------------------------------------

  /**
   * INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name [PARTITION part_spec] [column_list] select_statement
   * INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name VALUES values_row [, values_row ...]
   */
  case object INSERT extends SqlCommand(
    "insert",
    "(INSERT\\s+(INTO|OVERWRITE)\\s+.+)"
  )

  //---- DESCRIBE Statement--------------------------------------------------------------------------------------------------------------------------------

  /**
   * { DESCRIBE | DESC } [catalog_name.][db_name.]table_name
   */
  case object DESC extends SqlCommand(
    "desc",
    "(DESC\\s+.+)"
  )

  /**
   * { DESCRIBE | DESC } [catalog_name.][db_name.]table_name
   */
  case object DESCRIBE extends SqlCommand(
    "describe",
    "(DESCRIBE\\s+.+)"
  )


  //---- EXPLAIN Statement--------------------------------------------------------------------------------------------------------------------------------

  /**
   * For flink-1.13.x：EXPLAIN PLAN FOR `<query_statement_or_insert_statement>` <br>
   * For flink-1.14.x：EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, JSON_EXECUTION_PLAN `<query_statement_or_insert_statement>`<br>
   * For flink-1.15.x：<br>
   * <pre>
   * EXPLAIN [([ExplainDetail[, ExplainDetail]*]) | PLAN FOR] <query_statement_or_insert_statement_or_statement_set>
   *
   * statement_set:
   * EXECUTE STATEMENT SET
   * BEGIN
   * insert_statement;
   * ...
   * insert_statement;
   * END;
   * </pre>
   * Recommended not to use the form of flink-1.15.x
   */
  case object EXPLAIN extends SqlCommand(
    "explain",
    "(EXPLAIN\\s+.+)"
  )


  //---- USE Statements--------------------------------------------------------------------------------------------------------------------------------

  /**
   * USE CATALOG catalog_name
   */
  case object USE_CATALOG extends SqlCommand(
    "use catalog",
    "(USE\\s+CATALOG\\s+.+)"
  )

  /**
   * USE MODULES module_name1[, module_name2, ...]
   */
  case object USE_MODULES extends SqlCommand(
    "use modules",
    "(USE\\s+MODULES\\s+.+)"
  )

  /**
   * USE [catalog_name.]database_name
   */
  case object USE_DATABASE extends SqlCommand(
    "use database",
    "(USE\\s+(?!(CATALOG|MODULES)).+)"
  )

  //----SHOW Statements--------------------------------------------------------------------------------------------------------------------------------

  /**
   * SHOW CATALOGS
   */
  case object SHOW_CATALOGS extends SqlCommand(
    "show catalogs",
    "(SHOW\\s+CATALOGS\\s*)"
  )

  /**
   * SHOW CURRENT CATALOG
   */
  case object SHOW_CURRENT_CATALOG extends SqlCommand(
    "show current catalog",
    "(SHOW\\s+CURRENT\\s+CATALOG\\s*)"
  )

  /**
   * SHOW DATABASES
   */
  case object SHOW_DATABASES extends SqlCommand(
    "show databases",
    "(SHOW\\s+DATABASES\\s*)"
  )

  /**
   * SHOW CURRENT DATABASE
   */
  case object SHOW_CURRENT_DATABASE extends SqlCommand(
    "show current database",
    "(SHOW\\s+CURRENT\\s+DATABASE\\s*)"
  )

  /**
   * SHOW TABLES,support from flink-1.13.x<br>
   * SHOW TABLES [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE `<sql_like_pattern`> ],support from flink-1.15.x
   */
  case object SHOW_TABLES extends SqlCommand(
    "show tables",
    "(SHOW\\s+TABLES.*)"
  )

  /**
   * SHOW CREATE TABLE，flink-1.14.x support.
   */
  case object SHOW_CREATE_TABLE extends SqlCommand(
    "show create table",
    "(SHOW\\s+CREATE\\s+TABLE\\s+.+)"
  )

  /**
   * SHOW COLUMNS ( FROM | IN ) [`[`catalog_name.]database.]`<table_name>` [ [NOT] LIKE `<sql_like_pattern>`]，flink-1.15.x support.
   */
  case object SHOW_COLUMNS extends SqlCommand(
    "show columns",
    "(SHOW\\s+COLUMNS\\s+.+)"
  )

  /**
   * SHOW VIEWS
   */
  case object SHOW_VIEWS extends SqlCommand(
    "show views",
    "(SHOW\\s+VIEWS\\s*)"
  )

  /**
   * SHOW CREATE VIEW
   */
  case object SHOW_CREATE_VIEW extends SqlCommand(
    "show create view",
    "(SHOW\\s+CREATE\\s+VIEW\\s+.+)"
  )

  /**
   * SHOW [USER] FUNCTIONS
   */
  case object SHOW_FUNCTIONS extends SqlCommand(
    "show functions",
    "(SHOW\\s+(USER\\s+|)FUNCTIONS\\s*)"
  )

  /**
   * SHOW [FULL] MODULES
   */
  case object SHOW_MODULES extends SqlCommand(
    "show modules",
    "(SHOW\\s+(FULL\\s+|)MODULES\\s*)"
  )

  //----LOAD Statements--------------------------------------------------------------------------------------------------------------------------------

  /**
   * LOAD MODULE module_name [WITH ('key1' = 'val1', 'key2' = 'val2', ...)]
   */
  case object LOAD_MODULE extends SqlCommand(
    "load module",
    "(LOAD\\s+MODULE\\s+.+)"
  )


  //----UNLOAD Statements--------------------------------------------------------------------------------------------------------------------------------

  /**
   * UNLOAD MODULE module_name
   */
  case object UNLOAD_MODULE extends SqlCommand(
    "unload module",
    "(UNLOAD\\s+MODULE\\s+.+)"
  )


  // ----SET Statements--------------------------------------------------------------------------------------------------------------------------------

  /**
   * SET ('key' = 'value')
   */
  case object SET extends SqlCommand(
    "set",
    "SET(\\s+(\\S+)\\s*=(.*))?", {
      case a if a.length < 3 => None
      case a if a.head == null => Some(Array[String](cleanUp(a.head)))
      case a => Some(Array[String](cleanUp(a(1)), cleanUp(a(2))))
    }
  )

  // ----RESET Statements--------------------------------------------------------------------------------------------------------------------------------

  /**
   * RESET ('key')
   */
  case object RESET extends SqlCommand(
    "reset",
    "RESET\\s+'(.*)'"
  )

  /**
   * RESET
   */
  case object RESET_ALL extends SqlCommand(
    "reset all",
    "RESET",
    _ => Some(Array[String]("ALL"))
  )


  // ----INSERT SET Statements--------------------------------------------------------------------------------------------------------------------------------
  /*
   * <pre>
   * SQL Client execute each INSERT INTO statement as a single Flink job. However,
   * this is sometimes not optimal because some part of the pipeline can be reused.
   * SQL Client supports STATEMENT SET syntax to execute a set of SQL statements.
   * This is an equivalent feature with StatementSet in Table API.
   * The STATEMENT SET syntax encloses one or more INSERT INTO statements.
   * All statements in a STATEMENT SET block are holistically optimized and executed as a single Flink job.
   * Joint optimization and execution allows for reusing common intermediate results and can therefore significantly
   * improve the efficiency of executing multiple queries.
   * </pre>
   */

  /**
   * This is SQL Client's syntax，don't use in our platform.
   */
  @deprecated
  case object BEGIN_STATEMENT_SET extends SqlCommand(
    "begin statement set",
    "BEGIN\\s+STATEMENT\\s+SET",
    Converters.NO_OPERANDS
  )

  /**
   * This is SQL Client's syntax，don't use in our platform.
   */
  @deprecated
  case object END_STATEMENT_SET extends SqlCommand(
    "end statement set",
    "END",
    Converters.NO_OPERANDS
  )

  private[this] def cleanUp(sql: String): String = sql.trim.replaceAll("^(['\"])|(['\"])$", "")

}

/**
 * Call of SQL command with operands and command type.
 */
case class SqlCommandCall(lineStart: Int,
                          lineEnd: Int,
                          command: SqlCommand,
                          operands: Array[String],
                          originSql: String) {
}

case class FlinkSqlValidationResult(success: JavaBool = true,
                                    failedType: FlinkSqlValidationFailedType = null,
                                    lineStart: Int = 0,
                                    lineEnd: Int = 0,
                                    errorLine: Int = 0,
                                    errorColumn: Int = 0,
                                    sql: String = null,
                                    exception: String = null
                                   )

case class SqlSegment(start: Int, end: Int, sql: String)

object SqlSplitter {

  private lazy val singleLineCommentPrefixList = Set[String]("--")

  /**
   * Split whole text into multiple sql statements.
   * Two Steps:
   * Step 1, split the whole text into multiple sql statements.
   * Step 2, refine the results. Replace the preceding sql statements with empty lines, so that
   * we can get the correct line number in the parsing error message.
   * e.g:
   * select a from table_1;
   * select a from table_2;
   * select a from table_3;
   * The above text will be splitted into:
   * sql_1: select a from table_1
   * sql_2: \nselect a from table_2
   * sql_3: \n\nselect a from table_3
   *
   * @param sql
   * @return
   */
  def splitSql(sql: String): List[SqlSegment] = {
    val queries = ListBuffer[String]()
    val lastIndex = if (sql != null && sql.nonEmpty) sql.length - 1 else 0
    var query = new StringBuilder

    var multiLineComment = false
    var singleLineComment = false
    var singleQuoteString = false
    var doubleQuoteString = false
    var lineNum: Int = 0
    val lineNumMap = new collection.mutable.HashMap[Int, (Int, Int)]()

    // Whether each line of the record is empty. If it is empty, it is false. If it is not empty, it is true
    val lineDescriptor = {
      val scanner = new Scanner(sql)
      val descriptor = new collection.mutable.HashMap[Int, Boolean]
      var lineNumber = 0
      var startComment = false
      var hasComment = false

      while (scanner.hasNextLine) {
        lineNumber += 1
        val line = scanner.nextLine().trim
        val nonEmpty = line.nonEmpty && !line.startsWith("--")
        if (line.startsWith("/*")) {
          startComment = true
          hasComment = true
        }

        descriptor += lineNumber -> (nonEmpty && !hasComment)

        if (startComment && line.endsWith("*/")) {
          startComment = false
          hasComment = false
        }
      }
      descriptor
    }

    def findStartLine(num: Int): Int = if (num >= lineDescriptor.size || lineDescriptor(num)) num else findStartLine(num + 1)

    def markLineNumber(): Unit = {
      val line = lineNum + 1
      if (lineNumMap.isEmpty) {
        lineNumMap += (0 -> (findStartLine(1) -> line))
      } else {
        val index = lineNumMap.size
        val start = lineNumMap(lineNumMap.size - 1)._2 + 1
        lineNumMap += (index -> (findStartLine(start) -> line))
      }
    }

    for (idx <- 0 until sql.length) {

      if (sql.charAt(idx) == '\n') lineNum += 1

      breakable {
        val ch = sql.charAt(idx)

        // end of single line comment
        if (singleLineComment && (ch == '\n')) {
          singleLineComment = false
          query += ch
          if (idx == lastIndex && query.toString.trim.nonEmpty) {
            // add query when it is the end of sql.
            queries += query.toString
          }
          break()
        }

        // end of multiple line comment
        if (multiLineComment && (idx - 1) >= 0 && sql.charAt(idx - 1) == '/'
          && (idx - 2) >= 0 && sql.charAt(idx - 2) == '*') {
          multiLineComment = false
        }

        // single quote start or end mark
        if (ch == '\'' && !(singleLineComment || multiLineComment)) {
          if (singleQuoteString) {
            singleQuoteString = false
          } else if (!doubleQuoteString) {
            singleQuoteString = true
          }
        }

        // double quote start or end mark
        if (ch == '"' && !(singleLineComment || multiLineComment)) {
          if (doubleQuoteString && idx > 0) {
            doubleQuoteString = false
          } else if (!singleQuoteString) {
            doubleQuoteString = true
          }
        }

        // single line comment or multiple line comment start mark
        if (!singleQuoteString && !doubleQuoteString && !multiLineComment && !singleLineComment && idx < lastIndex) {
          if (isSingleLineComment(sql.charAt(idx), sql.charAt(idx + 1))) {
            singleLineComment = true
          } else if (sql.charAt(idx) == '/' && sql.length > (idx + 2)
            && sql.charAt(idx + 1) == '*' && sql.charAt(idx + 2) != '+') {
            multiLineComment = true
          }
        }

        if (ch == ';' && !singleQuoteString && !doubleQuoteString && !multiLineComment && !singleLineComment) {
          markLineNumber()
          // meet the end of semicolon
          if (query.toString.trim.nonEmpty) {
            queries += query.toString
            query = new StringBuilder
          }
        } else if (idx == lastIndex) {
          markLineNumber()

          // meet the last character
          if (!singleLineComment && !multiLineComment) {
            query += ch
          }

          if (query.toString.trim.nonEmpty) {
            queries += query.toString
            query = new StringBuilder
          }
        } else if (!singleLineComment && !multiLineComment) {
          // normal case, not in single line comment and not in multiple line comment
          query += ch
        } else if (ch == '\n') {
          query += ch
        }
      }
    }

    val refinedQueries = new collection.mutable.HashMap[Int, String]()
    for (i <- queries.indices) {
      val currStatement = queries(i)
      if (isSingleLineComment(currStatement) || isMultipleLineComment(currStatement)) {
        // transform comment line as blank lines
        if (refinedQueries.nonEmpty) {
          val lastRefinedQuery = refinedQueries.last
          refinedQueries(refinedQueries.size - 1) = lastRefinedQuery + extractLineBreaks(currStatement)
        }
      } else {
        var linesPlaceholder = ""
        if (i > 0) {
          linesPlaceholder = extractLineBreaks(refinedQueries(i - 1))
        }
        // add some blank lines before the statement to keep the original line number
        val refinedQuery = linesPlaceholder + currStatement
        refinedQueries += refinedQueries.size -> refinedQuery
      }
    }

    val set = new ListBuffer[SqlSegment]
    refinedQueries.foreach(x => {
      val line = lineNumMap(x._1)
      set += SqlSegment(line._1, line._2, x._2)
    })
    set.toList.sortWith((a, b) => a.start < b.start)
  }

  /**
   * extract line breaks
   *
   * @param text
   * @return
   */
  private[this] def extractLineBreaks(text: String): String = {
    val builder = new StringBuilder
    for (i <- 0 until text.length) {
      if (text.charAt(i) == '\n') {
        builder.append('\n')
      }
    }
    builder.toString
  }

  private[this] def isSingleLineComment(text: String) = text.trim.startsWith("--")

  private[this] def isMultipleLineComment(text: String) = text.trim.startsWith("/*") && text.trim.endsWith("*/")

  /**
   * check single-line comment
   *
   * @param curChar
   * @param nextChar
   * @return
   */
  private[this] def isSingleLineComment(curChar: Char, nextChar: Char): Boolean = {
    var flag = false
    for (singleCommentPrefix <- singleLineCommentPrefixList) {
      if (singleCommentPrefix.length == 1) {
        if (curChar == singleCommentPrefix.charAt(0)) {
          flag = true
        }
      }
      if (singleCommentPrefix.length == 2) {
        if (curChar == singleCommentPrefix.charAt(0) && nextChar == singleCommentPrefix.charAt(1)) {
          flag = true
        }
      }
    }
    flag
  }

}
